# -*- coding: utf-8 -*-
from time import time
from jms_hi.dm.dm_center_transfer_summary_new_hi import jms_dm__dm_center_transfer_summary_new_hi
from datetime import timedelta
from utils.operators.rich_sql_sensor import RichSqlSensor

#doris表名
doris_table = "dm_center_transfer_summary_new_hi"
#hive中表名
hive_table = "dm_center_transfer_summary_new_hi"
#Task ID
task_id = f"doris_{doris_table}"

label = f"doris_{doris_table}_{int(time())}"
timeout = timedelta(minutes=60).seconds
doris_jms_dm__dm_center_transfer_summary_new_hi = RichSqlSensor(
    task_id='doris_jms_dm__dm_center_transfer_summary_new_hi',
    pool='broker_load_pool',
    email=['wangmenglei@jtexpress.com','yl_bigdata@yl-scm.com'],
    conn_id='doris',
    pre_sql=f"""
    TRUNCATE TABLE jms_dm.{doris_table} PARTITION (
               p{{{{ execution_date | cst_ds_nodash }}}}
              ,p{{{{ execution_date | date_add(-1) | cst_ds_nodash }}}}
        );
    LOAD LABEL jms_dm.{label} (
          DATA INFILE(
                "hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_center_transfer_summary_new_hi/dt={{{{  execution_date | cst_ds  }}}}/sum_flag_code=zrr/*"
               ,"hdfs://{{{{ var.value.hadoop_namespace }}}}/dw/hive/jms_dm.db/external/dm_center_transfer_summary_new_hi/dt={{{{  execution_date | date_add(-1) | cst_ds  }}}}/sum_flag_code=zrr/*"
          ) 
          INTO TABLE dm_center_transfer_summary_new_hi
          FORMAT AS 'PARQUET'
        )
        WITH BROKER '{{{{ var.json.doris_brokers | random_choice }}}}'
        PROPERTIES ('timeout'='{timeout}', 'max_filter_ratio'='0.0')
        """,
    poke_sql=f"SHOW LOAD FROM jms_dm WHERE label = '{label}' ORDER BY CreateTime DESC LIMIT 1",
    sql_on_kill=f"CANCEL LOAD FROM jms_dm WHERE LABEL = '{label}'",
    success=lambda r: r[2] == 'FINISHED',
    failure=lambda r: (r[2] is not None and r[2] == 'CANCELLED', str(r[7])),
    poke_interval=60,
    execution_timeout=timedelta(seconds=timeout + 120), )

doris_jms_dm__dm_center_transfer_summary_new_hi << jms_dm__dm_center_transfer_summary_new_hi